package org.example.utils;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.util.AccumulatorV2;
import org.example.Entity.BusinessData;
import org.example.Entity.CategoryCountInfo;
import org.example.Entity.ProductIdToNameMapEnum;
import org.example.Entity.UserVisitAction;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;




//自定义AccumulatorV2实现类，用于累加计算品类相关的点击、下单、支付次数
class CategoryCountAccumulator extends AccumulatorV2<UserVisitAction, Map<Tuple2<String, String>, Long>> implements Serializable{

    private Map<Tuple2<String, String>, Long> map1 = new HashMap<>();

    // 判断是否为初始状态（即map为空）
    @Override
    public boolean isZero() {
        return map1.isEmpty();
    }

    // 复制累加器实例
    @Override
    public AccumulatorV2<UserVisitAction, Map<Tuple2<String, String>, Long>> copy() {
        CategoryCountAccumulator newACC = new CategoryCountAccumulator();
        newACC.map1 = new HashMap<>(this.map1);
        return newACC;
    }

    // 重置累加器，清空内部存储的map数据
    @Override
    public void reset() {
        map1.clear();
    }

    // 处理单个分区内的数据，根据不同的业务行为（点击、下单、支付）累加相应的次数
    @Override
    public void add(UserVisitAction action) {
        // 点击行为
        if (action.getClick_category_id()!= -1) {
            Tuple2<String, String> key = new Tuple2<>(action.getClick_category_id() + "", "click");
            map1.put(key, map1.getOrDefault(key, 0L) + 1L);
        }
        // 下单行为
        else if (!"null".equals(action.getOrder_category_ids())) {
            String[] ids = action.getOrder_category_ids().split(",");
            for (String id : ids) {
                Tuple2<String, String> key = new Tuple2<>(id, "order");
                map1.put(key, map1.getOrDefault(key, 0L) + 1L);
            }
        }
        // 支付行为
        else if (!"null".equals(action.getPay_category_ids())) {
            String[] ids = action.getPay_category_ids().split(",");
            for (String id : ids) {
                Tuple2<String, String> key = new Tuple2<>(id, "pay");
                map1.put(key, map1.getOrDefault(key, 0L) + 1L);
            }
        }
    }

    // 合并不同分区的累加结果
    @Override
    public void merge(AccumulatorV2<UserVisitAction, Map<Tuple2<String, String>, Long>> other) {
        Map<Tuple2<String, String>, Long> map2 = other.value();
        for (Map.Entry<Tuple2<String, String>, Long> entry : map2.entrySet()) {
            Tuple2<String, String> k = entry.getKey();
            Long v = entry.getValue();
            map1.put(k, map1.getOrDefault(k, 0L) + v);
        }
    }

    // 获取累加器当前的值（即存储了各品类操作次数的map）
    @Override
    public Map<Tuple2<String, String>, Long> value() {
        return map1;
    }
}

public class Top10 implements Serializable  {
    public static List<BusinessData> top10() {
        SparkConf conf = new SparkConf().setAppName("Top10").setMaster("local[*]");
        // 创建JavaSparkContext，该对象是提交的入口
        JavaSparkContext sc = new JavaSparkContext(conf);
        Top10 top10 = new Top10();
        List<BusinessData> sparkRun = top10.sparkRun(sc);
        // 9. 关闭连接
        sc.stop();
        return sparkRun;
    }
    public List<BusinessData>  sparkRun(JavaSparkContext sc) {

  //  public static void main(String[] args) {
        // 创建SparkConf
        
       // SparkConf conf = new SparkConf().setAppName("Top10").setMaster("local[*]");
        // 创建JavaSparkContext，该对象是提交的入口
       // JavaSparkContext sc = new JavaSparkContext(conf);

        // 后续代码部分，如数据读取、处理、累加器使用、排序取前10以及关闭连接等操作保持不变
        // 1. 读取数据
        JavaRDD<String> dataRDD = sc.textFile("src/datas/user_visit_action.txt");

        // 2. 将读到的数据进行切分，并且将切分的内容封装为UserVisitAction对象
        JavaRDD<UserVisitAction> actionRDD = dataRDD.map(line -> {
            String[] fields = line.split("_");
            return new UserVisitAction(
                    fields[0],
                    Long.parseLong(fields[1]),
                    fields[2],
                    Long.parseLong(fields[3]),
                    fields[4],
                    fields[5],
                    Long.parseLong(fields[6]),
                    Long.parseLong(fields[7]),
                    fields[8],
                    fields[9],
                    fields[10],
                    fields[11],
                    Long.parseLong(fields[12])
            );
        });

        // 3. 创建累加器并注册
        CategoryCountAccumulator acc = new CategoryCountAccumulator();
        sc.sc().register(acc, "myAcc");

        // 4. 遍历actionRDD，使用累加器进行统计
        actionRDD.foreach(new VoidFunction<UserVisitAction>() {
            @Override
            public void call(UserVisitAction action) {
                acc.add(action); 
            }
        });

       
       // 5. 获取累加器的值
        Map<Tuple2<String, String>, Long> accMap = acc.value();

        // 6. 对累加出来的数据按照类别进行分组
        Map<String, Map<Tuple2<String, String>, Long>> groupMap = new HashMap<>();
        for (Map.Entry<Tuple2<String, String>, Long> entry : accMap.entrySet()) {
            Tuple2<String, String> key = entry.getKey();
            String categoryId = key._1;
            if (!groupMap.containsKey(categoryId)) {
                groupMap.put(categoryId, new HashMap<>());
            }
            groupMap.get(categoryId).put(key, entry.getValue());
        }

        // 7. 对分组后的数据进行结构的转换为CategoryCountInfo对象
        List<CategoryCountInfo> categoryCountInfoList = new ArrayList<>();
        for (Map.Entry<String, Map<Tuple2<String, String>, Long>> entry : groupMap.entrySet()) {
            String id = entry.getKey();
            Map<Tuple2<String, String>, Long> map = entry.getValue();
            long clickCount = map.getOrDefault(new Tuple2<>(id, "click"), 0L);
            long orderCount = map.getOrDefault(new Tuple2<>(id, "order"), 0L);
            long payCount = map.getOrDefault(new Tuple2<>(id, "pay"), 0L);
            categoryCountInfoList.add(new CategoryCountInfo(id, clickCount, orderCount, payCount));
        }

        // 8. 将转换后的数据进行排序（降序）取前10名
         categoryCountInfoList.sort((left, right) -> {
            // 先按照点击次数降序比较
            int clickCountCompare = Long.compare(right.getClickCount(), left.getClickCount());
            if (clickCountCompare!= 0) {
                return clickCountCompare;
            }
            // 点击次数相同，按照订单次数降序比较
            int orderCountCompare = Long.compare(right.getOrderCount(), left.getOrderCount());
            if (orderCountCompare!= 0) {
                return orderCountCompare;
            }
            // 订单次数相同，按照支付次数降序比较
            return Long.compare(right.getPayCount(), left.getPayCount());
        });
      Map<String, String> productIdToNameMap = ProductIdToNameMapEnum.INSTANCE.getProductIdToNameMap();
       List<CategoryCountInfo> top10List = categoryCountInfoList.size() > 10? categoryCountInfoList.subList(0, 10) : categoryCountInfoList;
       List<BusinessData> businessData = new ArrayList<>();

for (CategoryCountInfo info : top10List) {
    String productName = productIdToNameMap.getOrDefault(info.getCategoryId(), info.getCategoryId());
    BusinessData data = new BusinessData();
    data.setCategory(productName);
    data.setClickCount(info.getClickCount());
    data.setOrderCount(info.getOrderCount());
    data.setPaymentCount(info.getPayCount());
    businessData.add(data);
  //  System.out.println("CategoryCountInfo(" + productName + ", " + info.getClickCount() + ", " + info.getOrderCount() + ", " + info.getPayCount() + ")");

}
         // 9. 关闭连接
      // sc.stop();
    return businessData;
   }
}